标签
分布式锁
字数
2130 字
阅读时间
11 分钟
分布式锁
java
import com.commnetsoft.commons.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* 缓存相关
*
* @author Brack.zhu
* @date 2020/7/3
*/
@Component
public class CacheHelper {
@Value("#{coreConfig.applicationName}")
private String applicationName;
@Autowired
private RedisTemplate redisTemplate;
/**
* 尝试获取分布式锁<br/>
* 锁主键格式:{服务名}:lock:{key}
* @param key 锁主键
* @param lockFactor 锁因子,加锁解锁标识
* @param expireTime 锁有效期
* @param timeUnit 有效期单位
* @return
*/
public boolean tryLock(String key, String lockFactor, long expireTime, TimeUnit timeUnit) {
if (StringUtils.isNotEmpty(key)) {
String realKey = buildRealKey(key);
// NX 只在键不存在时,才对键进行设置操作
return redisTemplate.opsForValue().setIfAbsent(realKey, lockFactor, expireTime, timeUnit);
}
return false;
}
/**
* 释放分布式锁<br/>
* 锁主键格式:服务名:key
* @param key 锁主键
* @param lockFactor 因子,加锁解锁标识
* @return
*/
public boolean unLock(String key, String lockFactor) {
if (StringUtils.isNotEmpty(key)) {
String realKey = buildRealKey(key);
// Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行
String scriptStr = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Boolean> script = RedisScript.of(scriptStr, Boolean.class);
DefaultScriptExecutor defaultScriptExecutor = new DefaultScriptExecutor(redisTemplate);
Boolean rs = (Boolean) defaultScriptExecutor.execute(script, Collections.singletonList(realKey), lockFactor);
return rs;
}
return false;
}
/**
* 分布式锁重新续期<br/>
* 锁主键格式:服务名:key
*
* @param key 锁主键
* @param lockFactor 因子,加锁解锁标识
* @param expirationTime 更新过期时间,单位:秒
* @return
*/
public boolean lockRenewExpiration(String key, String lockFactor, int expirationTime) {
if (StringUtils.isNotEmpty(key)) {
String realKey = buildRealKey(key);
// Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行
String scriptStr = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
RedisScript<Boolean> script = RedisScript.of(scriptStr, Boolean.class);
DefaultScriptExecutor defaultScriptExecutor = new DefaultScriptExecutor(redisTemplate);
Boolean rs = (Boolean) defaultScriptExecutor.execute(script, Collections.singletonList(realKey), lockFactor, expirationTime);
return rs;
}
return false;
}
/**
* 构建锁主键
* @param key
* @return
*/
private String buildRealKey(String key) {
return applicationName + ":lock:" + key;
}
}分布式锁注解
java
/**
* 分布式锁注解<br/>
* <blockquote><pre>
* {@code
* Controller{
*
* @Autowired
* private Service service
*
* @GetMapping
* Result<Void> cacheTest(){
* service.bMethod1();
* service.bMethod2();
* }
* }
*
* Service{
*
* //Spring代理调用注解有效
* @DistLock
* bMethod1() {
* //业务逻辑
* }
*
* bMethod2(){
* //注解无效,未经过Spring代理调用AOP拦截不到
* bInnerMethod();
* //解决方案:通过Spring代理调用注解才能有效
* SpringContextUtil.getBean(Service.Class).bInnerMethod();
* //业务逻辑
* }
*
* @DistLock
* bInnerMethod(){
* //业务逻辑
* }
* }
* }
* </blockquote></pre>
*
* @author Brack.zhu
* @date 2021/1/19
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistLock {
/**
* 分布式锁key,支持Spring的表达式,SpEL 如:#name<br/>
*
* @return
*/
String value();
}分布式注解切面
java
import com.commnetsoft.core.CoreConstant;
import com.commnetsoft.core.annotation.DistLock;
import com.commnetsoft.core.utils.AopUtil;
import com.commnetsoft.core.utils.SpElUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.expression.EvaluationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* 分布式锁AOP切面类
*
* @author Brack.zhu
* @date 2021/1/19
*/
@Aspect
@Component
public class DistLockAspect {
@Value("#{coreConfig.applicationName}")
private String applicationName;
private Logger log = LoggerFactory.getLogger(DistLockAspect.class);
@Pointcut("@annotation(distLock)")
public void pointcut(DistLock distLock) {
}
@Around(value = "pointcut(distLock)")
public Object distLockAround(ProceedingJoinPoint joinPoint, DistLock distLock) throws Throwable {
Method joinPointMethod = AopUtil.getMethod(joinPoint);
EvaluationContext evaluationContext = SpElUtil.methodBindParam(joinPointMethod, joinPoint.getArgs());
String val = SpElUtil.evaluate(distLock.value(), evaluationContext);
String completionKey = completionLockKey(val);
try (CacheLock lock = CacheLock.of(completionKey)) {
if(log.isDebugEnabled()){
log.debug("方法:{},key:{},开始获取分布式锁...",joinPointMethod,completionKey);
}
lock.lock();
if(log.isDebugEnabled()){
log.debug("方法:{},key:{},获取分布式锁完成",joinPointMethod,completionKey);
}
return joinPoint.proceed(joinPoint.getArgs());
} catch (Exception e) {
log.error("方法:{},key:{},分布式锁注解处理异常:",joinPointMethod,completionKey,e);
throw e;
}
}
/**
* 构建完整的锁key
*
* @param lockVal 显现锁值
* @return 完整的锁值
*/
private String completionLockKey(String lockVal) {
return applicationName + CoreConstant.Cache.REDIS_KEY_MARK + lockVal;
}
}分布式缓存对象
java
import com.commnetsoft.commons.utils.UUIDUtils;
import com.commnetsoft.core.CommonError;
import com.commnetsoft.core.cache.extend.watchdog.LockWatchDogManager;
import com.commnetsoft.core.utils.SpringContextUtil;
import com.commnetsoft.exception.MicroRuntimeException;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 缓存分布式锁对象<br/>
* 需要手动释放锁,或者使用自动资源释放代码如下<br/>
* <blockquote><pre>
* {@code
* //自动资源释放 JDK1.7以上支持
* try (CacheLock c=CacheLock.of("test")){
* c.lock();
* System.out.println("获取到锁");
* //业务逻辑
* }catch (Exception e){
* e.printStackTrace();
* }
* //try-catch运行完后自动释放锁
* }
* </blockquote></pre>
*
* @author Brack.zhu
* @date 2020/7/3
*/
public class CacheLock implements AutoCloseable {
private String key;
private String lockFactor;
private long expireTime;
private TimeUnit timeUnit;
/**
* 同步获取锁最大等待时间
* -1:无限长
*/
private long syncMaxTime = -1;
/**
* 是否已经锁成功
*/
private volatile boolean isLocked = false;
private CacheHelper cacheHelper = SpringContextUtil.getBean(CacheHelper.class);
private Logger log = LoggerFactory.getLogger(CacheLock.class);
/**
* 构建分布式锁对象
*
* @param key 锁主键
* @param lockFactor 锁因子,加锁解锁标识
* @param expireTime 锁有效期
* @param timeUnit 有效期单位
* @param syncMaxTime 同步获取锁最大等待时间,单位毫秒
*/
public CacheLock(String key, String lockFactor, long expireTime, TimeUnit timeUnit, long syncMaxTime) {
this.key = key;
this.lockFactor = lockFactor;
this.expireTime = expireTime;
this.timeUnit = timeUnit;
this.syncMaxTime = syncMaxTime;
}
/**
* 尝试获取分布式锁
*
* @return 获取结果
*/
public boolean tryLock() {
return cacheHelper.tryLock(key, lockFactor, expireTime, timeUnit);
}
/**
* 获取分布式锁<br/>
* 同步获取,获取失败不释放方法
*
* @throws MicroRuntimeException 需要处理获取锁超时异常
*/
public void lock() throws MicroRuntimeException {
long start = System.currentTimeMillis();
do {
isLocked = tryLock();
if (!isLocked) {
if (syncMaxTime > 0 && (System.currentTimeMillis() - start) >= syncMaxTime) {
throw new MicroRuntimeException(CommonError.cache_lock_timeout_failed);
}
int sleep = RandomUtils.nextInt(200);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
log.error("", e);
}
}
} while (!isLocked);
//获取锁成功
if (isLocked) {
LockWatchDogManager lockWatchDogManager = SpringContextUtil.getBean(LockWatchDogManager.class);
if (null == lockWatchDogManager) {
log.warn("锁的看门狗未设置成功,因为LockWatchDogManager获取失败.{}", this);
} else {
lockWatchDogManager.onWatchDog(Thread.currentThread(), this);
}
}
}
/**
* 释放分布式锁
*/
public void unLock() {
if (isLocked) {
boolean rs = cacheHelper.unLock(key, lockFactor);
if (!rs) {
log.warn("释放分布式锁失败,{},{},{},{}", key, lockFactor, expireTime, timeUnit);
}
}
}
/**
* 缓存有效期续租
*
* @return 续租结果
*/
public boolean renew() {
if (isLocked) {
Long let = timeUnit.toSeconds(expireTime);
return cacheHelper.lockRenewExpiration(key, lockFactor, let.intValue());
}
return false;
}
/**
* 自动关闭资源
*
* @throws Exception
*/
@Override
public void close() throws Exception {
unLock();
}
@Override
public String toString() {
return super.toString() + ";" + isLocked + "," + key + "," + lockFactor + "," + expireTime + "," + timeUnit + "," + syncMaxTime;
}
/**
* 创建一个默认分布式锁<br/>
* <ui>
* <li>锁单次有效期默认为30秒</li>
* <li>同步锁获取等待时间:在出现锁争抢时,-1为一直等待,直到锁获取成功</li>
* <ui/>
*
* @param key 分布式锁主键
* @return 分布式锁对象
*/
public static CacheLock of(String key) {
Objects.requireNonNull(key, "分布式锁主键不能为空");
return new CacheLock(key, UUIDUtils.generate(), LockWatchDogManager.DISTLOCK_DEF_EXPIRETIME, TimeUnit.MILLISECONDS, -1);
}
/**
* 创建一个指定过期时间的分布式锁<br/>
* <ui>
* <li>锁单次有效期默认为30秒</li>
* <li>同步锁获取等待时间支持传参</li>
* <ui/>
*
* @param key 分布式锁主键
* @param syncMaxTime 同步锁获取等待时间:在出现锁争抢时,-1为一直等待,直到锁获取成功。单位毫秒
* @return 分布式锁对象
*/
public static CacheLock of(String key, long syncMaxTime) {
Objects.requireNonNull(key, "分布式锁主键不能为空");
return new CacheLock(key, UUIDUtils.generate(), LockWatchDogManager.DISTLOCK_DEF_EXPIRETIME, TimeUnit.MILLISECONDS, syncMaxTime);
}
}看门狗
管理类
java
import com.commnetsoft.core.cache.CacheLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
/**
* 分布式锁看门狗管理类
*
* @author Brack.zhu
* @date 2021/1/29
*/
@Component
public class LockWatchDogManager {
private Set<DistLockDogTarget> distLockWatchSet = new HashSet<>();
/**
* 分布式锁默认过期时间,单位毫秒
*/
public final static long DISTLOCK_DEF_EXPIRETIME = 30000;
private LockWatchDog lockWatchDog;
private Logger log = LoggerFactory.getLogger(LockWatchDogManager.class);
@PostConstruct
public void postConstruct() {
lockWatchDog = new LockWatchDog(this, DISTLOCK_DEF_EXPIRETIME / 3);
lockWatchDog.start();
}
@PreDestroy
public void preDestroy() {
lockWatchDog.callStop();
}
/**
* 开启线程看门狗
*
* @param thread 调用线程对象
* @param cacheLock 缓存锁对象
*/
public void onWatchDog(Thread thread, CacheLock cacheLock) {
if (null == thread || null == cacheLock) {
return;
}
DistLockDogTarget distLockDogTarget = new DistLockDogTarget();
distLockDogTarget.setLockCallThread(thread);
distLockDogTarget.setCacheLock(cacheLock);
distLockWatchSet.add(distLockDogTarget);
}
/**
* 检查--看门狗巡查
*/
public void watch() {
Iterator<DistLockDogTarget> it = distLockWatchSet.iterator();
while (it.hasNext()) {
DistLockDogTarget target = it.next();
Thread thread = target.getLockCallThread().get();
if (null != thread && thread.isAlive()) {
CacheLock cacheLock = target.getCacheLock().get();
if (null != cacheLock) {
boolean rs = false;
try {
rs = cacheLock.renew();
} catch (Exception e) {
log.error("缓存锁续租失败:{},{}", thread, cacheLock, e);
}
if (rs) {
//续租成功
continue;
}
}
}
it.remove();
}
}
}看门狗线程
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 分布式锁看门狗线程
*
* @author Brack.zhu
* @date 2021/1/6
*/
public class LockWatchDog extends Thread {
private boolean stop = false;
private LockWatchDogManager lockWatchDogManager;
private long interval;
private Logger log = LoggerFactory.getLogger(LockWatchDog.class);
public LockWatchDog(LockWatchDogManager lockWatchDogManager, long interval) {
super("Lock Watch Dog Thread");
this.lockWatchDogManager = lockWatchDogManager;
this.interval = interval;
}
/**
* 触发线程停止
*/
public void callStop() {
this.stop = true;
this.interrupt();
}
@Override
public void run() {
while (!stop) {
lockWatchDogManager.watch();
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
//ignore
}
}
log.warn("分布式锁看门狗线程退出!");
}
}看门狗模型
java
import com.commnetsoft.core.cache.CacheLock;
import java.lang.ref.WeakReference;
/**
* 看门狗巡检目标模型
*
* @author Brack.zhu
* @date 2021/1/29
*/
public class DistLockDogTarget {
/**
* 缓存锁调用线程对象
*/
private WeakReference<Thread> lockCallThread;
/**
* 缓存锁对象
*/
private WeakReference<CacheLock> cacheLock;
public WeakReference<Thread> getLockCallThread() {
return lockCallThread;
}
public void setLockCallThread(WeakReference<Thread> lockCallThread) {
this.lockCallThread = lockCallThread;
}
public void setLockCallThread(Thread lockCallThread) {
this.lockCallThread = new WeakReference<>(lockCallThread);
}
public WeakReference<CacheLock> getCacheLock() {
return cacheLock;
}
public void setCacheLock(WeakReference<CacheLock> cacheLock) {
this.cacheLock = cacheLock;
}
public void setCacheLock(CacheLock cacheLock) {
this.cacheLock = new WeakReference<>(cacheLock);
}
}